Skip to content

Create initial skeleton for an EventHubs stress test.#4250

Open
LarryOsterman wants to merge 4 commits intoAzure:mainfrom
LarryOsterman:larryo/eh_stress_test
Open

Create initial skeleton for an EventHubs stress test.#4250
LarryOsterman wants to merge 4 commits intoAzure:mainfrom
LarryOsterman:larryo/eh_stress_test

Conversation

@LarryOsterman
Copy link
Copy Markdown
Member

Not an official test but capturing work from earlier this year and moving it into main.

Copilot AI review requested due to automatic review settings April 22, 2026 18:00
@LarryOsterman LarryOsterman requested a review from j7nw4r as a code owner April 22, 2026 18:00
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an initial, custom (non-harness) Event Hubs stress-test runner under azure_messaging_eventhubs/tests, exposing two scenarios via a clap-driven CLI to support ad-hoc durability/throughput testing.

Changes:

  • Introduces a stress_tests integration-test entrypoint that builds a subcommand CLI and runs one/all scenarios.
  • Adds two initial scenarios: a bounded publish/read test and a long-running continuous send/receive loop with periodic reporting.
  • Updates dev-dependencies to support the CLI and .env-based configuration.

Reviewed changes

Copilot reviewed 5 out of 6 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests.rs New stress-test runner with clap subcommands and tokio main
sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/basic_publish_read_test.rs Adds bounded publish/read scenario implementation + metrics/validation
sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/continuous_send_receive_stress.rs Adds continuous send/receive durability loop + periodic reporting
sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/README.md Documents scenarios, CLI usage, and environment variables
sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml Adds dev-deps for clap + dotenvy to support the stress test runner
Cargo.lock Locks new dev-dependency additions

Comment on lines +485 to +492
// Start reading from the beginning
let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID
let receiver = consumer
.open_receiver_on_partition(
partition_id.clone(),
Some(OpenReceiverOptions {
start_position: Some(StartPosition {
location: StartLocation::Earliest,
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumers start at StartLocation::Earliest, which will read historical events from the partition and can satisfy expected_events without ever consuming events from this test run. For a bounded publish/read validation, start from Latest (or from an enqueued time captured just before producers start) so the consumed set corresponds to the events published by this run.

Suggested change
// Start reading from the beginning
let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID
let receiver = consumer
.open_receiver_on_partition(
partition_id.clone(),
Some(OpenReceiverOptions {
start_position: Some(StartPosition {
location: StartLocation::Earliest,
// Start reading from the latest position so this test only counts events from the current run
let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID
let receiver = consumer
.open_receiver_on_partition(
partition_id.clone(),
Some(OpenReceiverOptions {
start_position: Some(StartPosition {
location: StartLocation::Latest,

Copilot uses AI. Check for mistakes.
Comment on lines +451 to +462
let expected_events = config.event_count;
let consumed_events = consumed_events.clone();

let handle = tokio::spawn(async move {
run_consumer_task(
host,
eventhub,
credential,
consumer_id,
expected_events,
consumed_events,
)
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expected_events is set to config.event_count for every consumer task. With multiple consumers, that makes the aggregate expected count event_count * consumer_count and can lead to unnecessary timeouts/hanging behavior (especially since consumers may share partitions). Consider dividing expected events per consumer based on partition assignment, or coordinating completion based on a shared consumed counter/map.

Copilot uses AI. Check for mistakes.
Comment thread sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/README.md Outdated
Comment on lines +33 to +36
#[tokio::main]
async fn main() {
// Initialize tracing for test output
tracing_subscriber::fmt()
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/stress_tests.rs defines its own main and is intended to be a custom-harness test binary, but there is no [[test]] entry in this crate’s Cargo.toml with harness = false for the stress_tests target. As-is, this integration test will fail to compile due to the default libtest harness generating its own main. Add a [[test]] name = "stress_tests" section with harness = false (and keep the file as the entrypoint).

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +66
// No subcommand: run all with default args
println!("Running all stress tests with default settings...");
for spec in registry() {
let default_matches = (spec.configure)(Command::new(spec.name))
.no_binary_name(true)
.get_matches_from(Vec::<&str>::new());

if let Err(e) = (spec.run)(default_matches).await {
eprintln!("{} FAILED: {}", spec.name, e);
process::exit(1);
}
}
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with a harness = false test target, cargo test will execute this binary by default, but it requires live env vars and (by default) runs a 72h scenario. This is very likely to break local cargo test runs and CI. Consider gating the test target behind required-features (e.g., required-features = ["stress-tests"]) and adding a non-default feature, or moving this CLI into examples/ or a separate crate/binary so it never runs as part of normal tests.

Suggested change
// No subcommand: run all with default args
println!("Running all stress tests with default settings...");
for spec in registry() {
let default_matches = (spec.configure)(Command::new(spec.name))
.no_binary_name(true)
.get_matches_from(Vec::<&str>::new());
if let Err(e) = (spec.run)(default_matches).await {
eprintln!("{} FAILED: {}", spec.name, e);
process::exit(1);
}
}
// No subcommand: do not run stress tests implicitly.
let mut cmd = build_cli();
let _ = cmd.print_help();
eprintln!();
eprintln!();
eprintln!(
"No stress test selected. Specify a subcommand to run a stress test explicitly."
);

Copilot uses AI. Check for mistakes.
Comment on lines +485 to +486
// Start reading from the beginning
let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer selects a partition with consumer_id % 2, assuming exactly two partitions. This will fail for Event Hubs with a different partition count (invalid partition id) and will never read from partitions beyond 0/1. Query the hub’s partition_ids (as the continuous test does) and assign consumers across the real set of partitions.

Suggested change
// Start reading from the beginning
let partition_id = format!("{}", consumer_id % 2); // Use partition 0 or 1 based on consumer ID
let eventhub_properties = consumer
.get_eventhub_properties()
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?;
if eventhub_properties.partition_ids.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Event Hub has no partitions",
)));
}
// Start reading from the beginning on a real partition from the Event Hub.
let partition_id =
eventhub_properties.partition_ids[consumer_id % eventhub_properties.partition_ids.len()]
.clone();

Copilot uses AI. Check for mistakes.
Comment on lines 37 to 45
azure_messaging_eventhubs = { path = ".", features = [
"in_memory_checkpoint_store",
] }
clap.workspace = true
criterion.workspace = true
dotenvy = "0.15.7"
fe2o3-amqp = { workspace = true, features = ["tracing"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change adds clap/dotenvy dev-dependencies for the new stress-test CLI, but the crate is still missing a [[test]] section to declare the stress_tests integration test as harness = false (and ideally required-features = [...] to prevent it from running by default). Without that, the new tests/stress_tests.rs (which defines main) won’t compile as an integration test, and/or will run unexpectedly in CI.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 6 changed files in this pull request and generated 11 comments.

Comment on lines +38 to +47
impl Default for TestConfig {
fn default() -> Self {
Self {
event_count: 1000,
producer_count: 2,
consumer_count: 2,
test_timeout: Duration::from_secs(300), // 5 minutes
event_size: 1024, // 1KB
batch_size: 100,
}
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestConfig::default() values (event_count=1000, producer/consumer_count=2, etc.) don’t match the clap defaults / run_standalone fallbacks (100 events, 1 producer, 1 consumer). This makes the “default config” ambiguous and can lead to confusing behavior depending on whether callers use TestConfig::default() vs CLI. Consider aligning these defaults (either update Default or the clap .default_value(...) + unwrap_or(...) values).

Copilot uses AI. Check for mistakes.
Comment on lines +151 to +159
let config = TestConfig {
event_count: *matches.get_one::<u32>("events").unwrap_or(&100) as usize,
producer_count: *matches.get_one::<u32>("producers").unwrap_or(&1) as usize,
consumer_count: *matches.get_one::<u32>("consumers").unwrap_or(&1) as usize,
test_timeout: std::time::Duration::from_secs(
*matches.get_one::<u64>("timeout").unwrap_or(&120),
),
event_size: *matches.get_one::<u32>("event-size").unwrap_or(&512) as usize,
batch_size: *matches.get_one::<u32>("batch-size").unwrap_or(&10) as usize,
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

producer_count and batch_size are taken from CLI without validation. producer_count == 0 will panic (division by zero in start_producers), and batch_size == 0 will cause an infinite loop in run_producer_task (batch_end never advances). Add clap validation (e.g., value_parser range >=1) and/or explicit checks that return a user-friendly error before running.

Suggested change
let config = TestConfig {
event_count: *matches.get_one::<u32>("events").unwrap_or(&100) as usize,
producer_count: *matches.get_one::<u32>("producers").unwrap_or(&1) as usize,
consumer_count: *matches.get_one::<u32>("consumers").unwrap_or(&1) as usize,
test_timeout: std::time::Duration::from_secs(
*matches.get_one::<u64>("timeout").unwrap_or(&120),
),
event_size: *matches.get_one::<u32>("event-size").unwrap_or(&512) as usize,
batch_size: *matches.get_one::<u32>("batch-size").unwrap_or(&10) as usize,
let event_count = *matches.get_one::<u32>("events").unwrap_or(&100) as usize;
let producer_count = *matches.get_one::<u32>("producers").unwrap_or(&1) as usize;
let consumer_count = *matches.get_one::<u32>("consumers").unwrap_or(&1) as usize;
let test_timeout =
std::time::Duration::from_secs(*matches.get_one::<u64>("timeout").unwrap_or(&120));
let event_size = *matches.get_one::<u32>("event-size").unwrap_or(&512) as usize;
let batch_size = *matches.get_one::<u32>("batch-size").unwrap_or(&10) as usize;
if producer_count == 0 {
return Err("`--producers` must be greater than 0".into());
}
if batch_size == 0 {
return Err("`--batch-size` must be greater than 0".into());
}
let config = TestConfig {
event_count,
producer_count,
consumer_count,
test_timeout,
event_size,
batch_size,

Copilot uses AI. Check for mistakes.
Comment on lines +191 to +193
let published_events = Arc::new(Mutex::new(HashMap::<String, String>::new()));
let consumed_events = Arc::new(Mutex::new(HashMap::<String, String>::new()));

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses std::sync::Mutex for published_events / consumed_events and locks it inside async tasks. In a Tokio runtime this can block the executor thread and distort stress test results (and in worst cases lead to starvation). Prefer tokio::sync::Mutex (or a concurrent map like DashMap) for data shared across async tasks.

Copilot uses AI. Check for mistakes.
Comment on lines +471 to +475
info!(
"Discovered {} partitions: {:?} (consumer_count hint: {})",
partition_ids.len(),
partition_ids,
config.consumer_count,
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer_count is accepted via CLI and stored in TestConfig, but the implementation spawns one consumer per partition and only uses consumer_count for logging. This makes the CLI option misleading. Either honor consumer_count (e.g., spawn that many consumers and assign partitions) or remove the option/config field to avoid confusion.

Suggested change
info!(
"Discovered {} partitions: {:?} (consumer_count hint: {})",
partition_ids.len(),
partition_ids,
config.consumer_count,
let actual_consumer_count = partition_ids.len();
if config.consumer_count != actual_consumer_count {
return Err(Box::new(std::io::Error::other(format!(
"consumer_count ({}) must match the Event Hub partition count ({}) for this test, which creates one consumer per partition",
config.consumer_count, actual_consumer_count
))));
}
info!(
"Discovered {} partitions: {:?}; spawning {} consumers",
actual_consumer_count,
partition_ids,
actual_consumer_count,

Copilot uses AI. Check for mistakes.
Comment on lines +324 to +328
batch.try_add_event_data(event, None)?;

// Record it as expected now that it's in a batch
let mut missing = state.missing_events.lock().await;
missing.insert(key, expected);
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When rolling over to a new batch, the result of batch.try_add_event_data(event, None)? is ignored. If an individual event can’t fit into an empty batch, try_add_event_data returns false and the event is silently not sent, but it is still inserted into missing_events and will be reported as lost. Handle the false case explicitly (return error / increment failure metric) before recording it as expected.

Suggested change
batch.try_add_event_data(event, None)?;
// Record it as expected now that it's in a batch
let mut missing = state.missing_events.lock().await;
missing.insert(key, expected);
match batch.try_add_event_data(event, None)? {
true => {
// Record it as expected now that it's in a batch
let mut missing = state.missing_events.lock().await;
missing.insert(key, expected);
}
false => {
return Err(std::io::Error::other(format!(
"event does not fit into an empty batch: batch_index={}, event_index={}, key={}",
expected.batch_index, expected.index, key
))
.into());
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +117
let end_instant = Instant::now() + config.duration;
let sender = tokio::spawn(send_loop(
producer.clone(),
state.clone(),
config.clone(),
end_instant,
));

let mut receiver_handles = Vec::new();
for partition_id in partition_ids {
let state = state.clone();
let host = host.clone();
let eventhub = eventhub.clone();
let credential = credential.clone();

receiver_handles.push(tokio::spawn(async move {
receive_loop(host, eventhub, credential, partition_id, state, end_instant).await
}));
}
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_loop is spawned before receivers are opened, but receivers default to StartLocation::Latest. Any events sent before a given partition receiver attaches will never be observed, yet they’ve already been added to missing_events and will be counted as lost/corrupted. Start receivers before the sender and/or use an EnqueuedTime start position captured at test start so the receiver includes all events produced by this run.

Copilot uses AI. Check for mistakes.
Comment on lines +296 to +305
match batch.try_add_event_data(event, None)? {
true => {
// Event was successfully added, record it as expected
let mut missing = state.missing_events.lock().await;
missing.insert(key, expected);
}
false => {
// Batch is full, send it and create a new one
producer.send_batch(batch, None).await?;

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing_events tracks every sent event body until it’s received, and entries are never aged out. With a 72h default duration (and if receivers fall behind or errors occur), this map can grow without bound and cause excessive memory usage/OOM. Consider bounding it (e.g., per-partition watermarks, sampling, periodic pruning) or switching to metrics that don’t require storing every outstanding event key.

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +66
// No subcommand: run all with default args
println!("Running all stress tests with default settings...");
for spec in registry() {
let default_matches = (spec.configure)(Command::new(spec.name))
.no_binary_name(true)
.get_matches_from(Vec::<&str>::new());

if let Err(e) = (spec.run)(default_matches).await {
eprintln!("{} FAILED: {}", spec.name, e);
process::exit(1);
}
}
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no subcommand is provided, main runs all stress tests with default args. With continuous_send_receive_stress defaulting to 72 hours, cargo test --test stress_tests can unexpectedly run for days. Consider requiring an explicit subcommand (or using a much shorter default duration when invoked via the “run all” path) to avoid accidental long-running executions.

Copilot uses AI. Check for mistakes.
Comment on lines +296 to +305
match batch.try_add_event_data(event, None)? {
true => {
// Event was successfully added, record it as expected
let mut missing = state.missing_events.lock().await;
missing.insert(key, expected);
}
false => {
// Batch is full, send it and create a new one
producer.send_batch(batch, None).await?;

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If try_add_event_data(...) returns false on an empty batch (which happens when a single event exceeds the max batch size), this code goes into the “batch is full” path and calls send_batch(batch, ...) on an empty EventDataBatch. EventDataBatch::get_messages() expects a batch envelope and will panic for empty batches. Detect the “event too large for an empty batch” case and return an error instead of trying to send the empty batch.

Copilot uses AI. Check for mistakes.
Comment on lines +223 to +229
let config = ContinuousStressConfig {
duration: Duration::from_secs(duration_hours * 60 * 60),
min_batch_size: min_batch_size.min(max_batch_size),
max_batch_size: max_batch_size.max(min_batch_size),
min_delay_secs: min_delay.min(max_delay),
max_delay_secs: max_delay.max(min_delay),
};
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_batch_size can be set to 0 via CLI (u32 parser allows it). That can lead to batch_size == 0, and the loop will attempt to send_batch with an empty EventDataBatch (panic) and will also skew metrics. Consider enforcing min-batch >= 1/max-batch >= 1 at the clap layer (or return an error when batch_size resolves to 0).

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants